package com.stay4it.rxjava;

import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.observables.ConnectableObservable;

/**
 * RxJava连接操作符 
 * 
 * http://blog.csdn.net/jdsjlzx/article/details/53365839
 * 
 * 
 */
public class RxJava11 {

	
	/**
	 * Publish 
	 * Publish 操作符将普通的Observable转换为可连接的Observable（ConnectableObservable）
	 * ConnectableObservable是Observable的子类。 
	 * @throws InterruptedException 
	 */
	private static void test1() throws InterruptedException {
		Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS);
		//使用publish操作符将普通Observable转换为可连接的Observable
		ConnectableObservable<Long> connectableObservable = observable.publish();
		
		//第一个订阅者订阅，不会开始发射数据
		connectableObservable.subscribe(new Subscriber<Long>() {
			@Override
            public void onCompleted() {
                System.out.println("1.onCompleted");
            }
            @Override
            public void onError(Throwable e) {
            	System.out.println("1.onError");
            }
            @Override
            public void onNext(Long value) {
            	System.out.println("1.onNext value :"+ value);
            }
		});
		
		//如果不调用connect方法，connectableObservable则不会发射数据
		connectableObservable.connect();
		//第二个订阅者延迟2s订阅，这将导致丢失前面2s内发射的数据
		connectableObservable
		        .delaySubscription(2, TimeUnit.SECONDS)// 0、1数据丢失
		        .subscribe(new Subscriber<Long>() {
		            @Override
		            public void onCompleted() {
		                System.out.println("2.onCompleted");
		            }
		            @Override
		            public void onError(Throwable e) {
		            	System.out.println("2.onError");
		            }
		            @Override
		            public void onNext(Long value) {
		            	System.out.println("2.onNext value :"+ value);
		            }
		        });
		
		Thread.sleep(6000); 
	}
	
	/**
	 * connect 
	 * 解释：connect是ConnectableObservable接口的一个方法，它的作用就是让ConnectableObservable
	 * 开始发射数据（即使没有任何订阅者订阅这个Observable，调用connect都会开始发射数据）。 
	 * @throws InterruptedException 
	 * 
	 */
	private static void test2() throws InterruptedException {
		
		Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS);
		//使用publish操作符将普通Observable转换为可连接的Observable
		ConnectableObservable<Long> connectableObservable = observable.publish();
		//开始发射数据,如果不调用connect方法，connectableObservable则不会发射数据
		Subscription subscription = connectableObservable.connect();
		
		//第一个订阅者延迟2s订阅，这将导致丢失前面2s内发射的数据
		connectableObservable
		        .delaySubscription(2, TimeUnit.SECONDS)// 0、1数据丢失
		        .subscribe(new Subscriber<Long>() {
		            @Override
		            public void onCompleted() {
		                System.out.println("onCompleted");
		            }
		            @Override
		            public void onError(Throwable e) {
		            	System.out.println("onError");
		            }
		            @Override
		            public void onNext(Long value) {
		            	System.out.println("onNext value :"+ value);
		            }
		        });
		
		//5秒后取消订阅
		Observable.interval(1, TimeUnit.SECONDS)
		.take(5)
		.subscribe(new Subscriber<Long>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted2");
                subscription.unsubscribe();//取消订阅
            }
            @Override
            public void onError(Throwable e) {
            	System.out.println("onError2");
            }
            @Override
            public void onNext(Long along) {
            	System.out.println("onNext2:"+along);
            }
        });
		
		Thread.sleep(10000); 
	}
	
	/**
	 * refCount 
	 * 解释：让一个可连接的Observable行为像普通的Observable。 
	 * 
	 * 可连接的Observable (connectable Observable)与普通的Observable差不多，
	 * 不过它并不会在被订阅时开始发射数据，而是直到使用了Connect操作符时才会开始。
	 * 用这种方法，你可以在任何时候让一个Observable开始发射数据。
	 * 
	 * refCount操作符还有一个变体：share()。有兴趣的可以自己去查找资料学习下。
	 * 
	 * @throws InterruptedException 
	 */
	private static void test3() throws InterruptedException {
		//创建一个可连接的Observable
		ConnectableObservable<Long> connectableObservable = Observable.interval(1, TimeUnit.SECONDS).take(6)
				.publish();
		//第一个订阅者
		connectableObservable.subscribe(new Subscriber<Long>() {
		    @Override
		    public void onCompleted() {
		        System.out.println("onCompleted1.");
		    }

		    @Override
		    public void onError(Throwable e) {
		        System.out.println("onError1: " + e.getMessage());
		    }

		    @Override
		    public void onNext(Long along) {
		        System.out.println("onNext1: " + along);
		    }
		});
		
		//第二个订阅者
		connectableObservable.delaySubscription(3, TimeUnit.SECONDS)//延时订阅
		.subscribe(new Subscriber<Long>() {
		    @Override
		    public void onCompleted() {
		        System.out.println("onCompleted2.");
		    }

		    @Override
		    public void onError(Throwable e) {
		        System.out.println("onError2: " + e.getMessage());
		    }

		    @Override
		    public void onNext(Long along) {
		        System.out.println("onNext2: " + along);
		    }
		});
		
		//如果不调用connect方法，connectableObservable则不会发射数据
		connectableObservable.connect();
		
		System.out.println("------after refCount()------");

		Observable<Long> observable = connectableObservable.refCount();
		
		//第三个订阅者
		observable.subscribe(new Subscriber<Long>() {
		    @Override
		    public void onCompleted() {
		        System.out.println("onCompleted3.");
		    }

		    @Override
		    public void onError(Throwable e) {
		        System.out.println("onError3: " + e.getMessage());
		    }

		    @Override
		    public void onNext(Long along) {
		        System.out.println("onNext3: " + along);
		    }
		});

		//第四个订阅者
		observable.delaySubscription(3, TimeUnit.SECONDS)
		.subscribe(new Subscriber<Long>() {
		    @Override
		    public void onCompleted() {
		        System.out.println("onCompleted4.");
		    }

		    @Override
		    public void onError(Throwable e) {
		        System.out.println("onError4: " + e.getMessage());
		    }

		    @Override
		    public void onNext(Long along) {
		        System.out.println("onNext4: " + along);
		    }
		});
		Thread.sleep(8000); 
	}
	
	/**
	 * replay 
	 * 使用Replay操作符返回的ConnectableObservable 会缓存订阅者订阅之前已经发射的数据，这样即使有订阅者在其发射数据开始之后进行订阅也能收到之前发射过的数据。
	 * Replay操作符能指定缓存的大小或者时间，这样能避免耗费太多内存。
	 * @throws InterruptedException 
	 * 
	 */
	private static void test4() throws InterruptedException {
		//创建一个可连接的Observable
		ConnectableObservable<Long> connectableObservable = Observable.interval(1, TimeUnit.SECONDS).take(5)
				.publish();
		//如果不调用connect方法，connectableObservable则不会发射数据
		connectableObservable.connect();
		//第一个订阅者
		connectableObservable.delaySubscription(3, TimeUnit.SECONDS)//延时订阅
		.subscribe(new Subscriber<Long>() {
		    @Override
		    public void onCompleted() {
		        System.out.println("onCompleted1.");
		    }

		    @Override
		    public void onError(Throwable e) {
		        System.out.println("onError1: " + e.getMessage());
		    }

		    @Override
		    public void onNext(Long along) {
		        System.out.println("onNext1: " + along);
		    }
		});

		//创建一个可连接的Observable
		ConnectableObservable<Long> connectableObservable2 = Observable.interval(1, TimeUnit.SECONDS).take(6)
				.replay(1);//这里不在使用publish，replay(1)缓存1个数据
		
		//如果不调用connect方法，connectableObservable则不会发射数据
		connectableObservable2.connect();
		//第二个订阅者
		connectableObservable2.delaySubscription(3, TimeUnit.SECONDS)//延时订阅
		.subscribe(new Subscriber<Long>() {
		    @Override
		    public void onCompleted() {
		        System.out.println("onCompleted2.");
		    }

		    @Override
		    public void onError(Throwable e) {
		        System.out.println("onError2: " + e.getMessage());
		    }

		    @Override
		    public void onNext(Long along) {
		        System.out.println("onNext2: " + along);
		    }
		});
		
		//创建一个可连接的Observable
		ConnectableObservable<Long> connectableObservable3 = Observable.interval(1, TimeUnit.SECONDS).take(6)
				.replay(3, TimeUnit.SECONDS);//这里不在使用publish，replay(3, TimeUnit.SECONDS)缓存3s内的数据
		
		//如果不调用connect方法，connectableObservable则不会发射数据
		connectableObservable3.connect();
		//第三个订阅者
		connectableObservable3.delaySubscription(3, TimeUnit.SECONDS)//延时订阅
		.subscribe(new Subscriber<Long>() {
		    @Override
		    public void onCompleted() {
		        System.out.println("onCompleted3.");
		    }

		    @Override
		    public void onError(Throwable e) {
		        System.out.println("onError3: " + e.getMessage());
		    }

		    @Override
		    public void onNext(Long along) {
		        System.out.println("onNext3: " + along);
		    }
		});
		
		
		Thread.sleep(8000); 
	}
	
	public static void main(String[] args) throws InterruptedException {
		test4();

	}

}
